CAOS-14 async & multithreading¶

Асинхронность¶

Intro¶

Давайте представим ситуацию: у вас поднят сервер, который слушает подключения на порту 8888.

И к вам подключается не 1, а сразу несколько клиентов.

Каждый из клиентов будет общаться с сервером (путем пересылки туда-обратно нескольких сообщений)

Вопрос - как нам их обрабатывать?

Перед тем, как отвечать на этот вопрос, давайте посмотрим, из чего состоит наше взаимодействие с клиентом

Ну и так далее...

Главное, то тут нужно уловить - Алиса много тупит

А сервер тратит много времени на ожидание

Возвращаемся к вопросу - как обрабатывать клиентов?

  1. По очереди
    • плюсы: просто и сердито
    • минусы: долго + большую часть веремени тратим на ожидание
  1. Параллельно (на каждого клиента создаем свой поток/процесс)
    • плюсы: просто, быстро и сердито
    • минусы:
      • большую часть времени каждый поток ждет
      • не сможем обработать большой наплыв (доступных потоков - конечное кол-во)
      • в некоторых ОС (кхм кхм Шиндоус) создание потока/процесса - трудоемкая операция
  1. А давайте придумаем, как не тратить время на ожидание

    Идея в том, чтоб во время ожидания одного клиента обрабатывать другого

Пока Алиса тупит, мы заняты и делаем полезную работу

Ну т.е. по сути 'алгоритм' сервера следующий:

// псевдокод

while (1) {
    if (is_ready(alice_fd)) {
        work_with(alice_fd);
    } else if (is_ready(bob_fd)) {
        work_with(bob_fd);
    } else {
        sleep();
    }
}

Выглядит вроде ок, если клиентов 2. Если клиентов больше, работать с ними будет труднее.

Как вообще is_ready можно реализовать?)

In [11]:
%%writefile example1.c

#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>

int main() {
    char buffer[123];
    fcntl(0, F_SETFL, O_NONBLOCK);  // Вся магия тут!)
    int len = read(0, buffer, sizeof(buffer));
    printf("%d\n", len);
    return 0;
}
Overwriting example1.c
In [12]:
!gcc example1.c -o example1
!./example1
-1

Если вход будет пустой - будет -1

[danila@archlinux ~/caos/14-async & multithreading]
$ ./example1
-1

Если на вход подать строку - будет ожидаемое поведение

[danila@archlinux ~/caos/14-async & multithreading]
$ ./example1 <<EOF
heredoc> asdadsadadada
heredoc> EOF
14

Причем даже так будет -1))

[danila@archlinux ~/caos/14-async & multithreading]
$ cat | ./example1
-1
asdadsa

Применив флаг O_NONBLOCK мы сделали чтение неблокирующим

Для более удобной работы с несколькими fd было бы удобно иметь механизм по типу такого:

while (1) {
    int ready_fd = get_ready_client();

    work_with(ready_fd);
}

Как же классно, что такой механизм есть!)

epoll¶

NAME

   epoll - I/O event notification facility


SYNOPSIS

   #include <sys/epoll.h>


DESCRIPTION

   The  epoll  API  performs a similar task to poll(2):
   monitoring multiple file descriptors to see  if  I/O
   is  possible  on  any of them.  The epoll API can be
   used either as an edge-triggered  or  a  level-trig‐
   gered  interface and scales well to large numbers of
   watched file descriptors.

Перевод простыми словами

epoll - это инструмент ядра для мониторинга файловых дескрипторов. Он вас проинформирует,
когда один из дескрипторов будет готов для ввода-вывода.

Давайте посмотрим, как с этим работать

In [19]:
%%writefile example2.c

#include <stdio.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <signal.h>
#include <sched.h>

int exit_pipe[2];


void handler(int sig) {
    char msg[] = "123";
    write(exit_pipe[1], msg, sizeof(msg));
}


int main() {
    pipe(exit_pipe);

    dprintf(2, "signal\n");
    struct sigaction sa = {
        .sa_handler = handler,
        .sa_flags = SA_RESTART
    };
    
    sigaction(SIGINT, &sa, NULL);
    
    int epoll_fd = epoll_create1(0);  // no flags
    
    fcntl(exit_pipe[0], F_SETFL, O_NONBLOCK);
    fcntl(0, F_SETFL, O_NONBLOCK);
    
    int fifo_fd = open("test_fifo", O_RDONLY);
    fcntl(fifo_fd, F_SETFL, O_NONBLOCK);
    
    dprintf(2, "epoll_ctl...\n");
    int all_fds[] = {exit_pipe[0], 0, fifo_fd};
    for (int i = 0; i < 3; ++i) {
        struct epoll_event event = {
            .events = EPOLLIN,
            .data.fd = all_fds[i]
        };
        
        // Будем ожидать готовности на чтение
        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, all_fds[i], &event);
    }
    
    dprintf(2, "Main loop\n");
    while (1) {
        struct epoll_event event;
        int N = epoll_wait(epoll_fd, &event, /*maxevents=*/ 1, /*timeout=*/ 1000);
        
        if (N <= 0) {
            sched_yield();
            continue;
        }
        
        int fd = event.data.fd;
        
        if (fd == exit_pipe[0]) {
            break;
        }
        
        char buffer[1024];
        size_t len = read(fd, buffer, sizeof(buffer));
        write(1, buffer, len);
    }
    
    dprintf(2, "Closing all\n");
    
    for (int i = 0; i < 3; ++i) {
        close(all_fds[i]);
    }
    
    close(epoll_fd);
    
    dprintf(2, "Exiting...\n");
    
    return 0;
}
Overwriting example2.c
# term0
[danila@archlinux ~/caos/14-async & multithreading]
$ ./example2
signal
epoll_ctl...
Main loop
1234
1234
abc
^CClosing all
Exiting...
[danila@archlinux ~/caos/14-async & multithreading]
$

# term1
[danila@archlinux ~/caos/14-async & multithreading]
$ cat > test_fifo
abc

Что ещё важно подчеркнуть

  • epoll это просто и удобно (правда)
    • epoll_create1
    • epoll_ctl
    • epoll_wait
  • epoll динамический - он позволяет на лету добавлять fd (через epoll_ctl) и удалять их (просто через close)

Нулевка¶

Сори, она слишком простая (примера выше достаточно чтоб решить и нулевку, и единичку)

Многопоточность¶

Intro¶


Кратенькая шутка про кассирш в КСП


Потоки¶

(перед защитой вспомните, чем они от процессов отличаются)

В Си есть кросплатформенная реализация потоков - через библиотеку pthread

#include <pthread.h>

void *worker(void *args) {
    /// ...
}

int main() {
    pthread_t thread;

    int my_int_arg;

    pthread_create(
        &thread,
        NULL /*init and destroy thread attributes*/,
        worker,
        (void*)(&my_int_arg)
    );

    // ...

    pthread_join(&thread, NULL /*retval*/);
}

Поскольку в задачах просят создавать ПОТОКИ, а не ПРОЦЕССЫ, используйте их))

(Не)атомарность¶

Единственная сложность в многопоточке - сделать так, чтоб разные потоки/процессы друг другу не мешали при работе с одними и теми же данными

Одно из важнейших свойств инструкций (ощутите проблему) - атомарность

Атомарная операция/инструкция - операция, которая не может быть прервана. Она либо выполняется до конца, либо не выполняется вообще

Очевидный пример неатомарной операции

  • Копирование большого куска данных
  • Вообще почти любая операция, которую вы назовете

Атомарные операции

  • НЕКОТОРЫЕ операции чтения
  • НЕКОТОРЫЕ операции записи
  • CAS (Compare And Swap) инструкции

Атомарная инструкция или нет - определяется ещё при проектировании ПРОЦЕССОРА.

Чтоб простые разработчики (мы с вами) могли с этим как-то жить и работать, при портировании стандартных библиотек портируют различные примитивы - atomics, mutex, condvars, ...

(более подробно будет на курсе Ромы Липовского в следующем семе)

Atomic (атомик) - тип данных, поддерживающий атомарные операции над собой (остальное позже)

ВАЖНО ДЛЯ ПОНИМАНИЯ

Если вы вызываете атомарную функцию, это не значит, что ЭТА КОНКРЕТНАЯ СТРОЧКА атомарна

Пример cas-инструкции - lock-free добавление элемента в list)

// не атомарно
item_t *new_node = malloc(...);

do {
    // тоже не атомарно
    new_node->next = head;

    // опять не атомарно, но где-то там есть что-то атомарное
} while (!atomic_compare_exchange_weak(&head, &new_node->next, (_Atomic item_t*)new_node));

(ниже ещё раз разберем этот пример)

(пока просто запомните, что атомики существуют)

Mutex¶

Ладно, окей, а что если надо получать доступ к большим областям памяти?

Тут атомики не помогут

Да, но тут помогают примитивы синхронизации

Пример - два потока пытаются одновременно работать с каким-то разделяемым ресурсом (сокетом, файлом, пользователем, большим куском данных)

Чтоб защитить разделяемый ресурс используют mutex.

Два потока пытаются захватить mutex. Кто захватил mutex - тот и работает с ресурсом

Аналогия - флажок

Флажок один. Что угодно делай - он останется один единственный

Флажком может владеть либо какой-то один процесс (процесс держит флажок в руках), либо никто (флажок стоит на постаменте)

Пример

In [23]:
%%writefile example3.c

// without mutex

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

int global_int;

void *worker(void *arg) {
    int *gi = (int*)arg;
    
    for (int i = 0; i < 1000; ++i) {
        int old = *gi;
        int new = old + 1;
        *gi = new;
    }
}

int main() {
    global_int = 0;
    pthread_t threads[2];
    
    pthread_create(threads + 0, NULL, worker, (void*)(&global_int));
    pthread_create(threads + 1, NULL, worker, (void*)(&global_int));
    
    pthread_join(threads[0], NULL);
    pthread_join(threads[1], NULL);
    
    printf("%d\n", global_int);
    
    return 0;
}
Overwriting example3.c
In [24]:
!gcc example3.c -o example3
!./example3
1980

Выше НЕ ВСЕГДА 2000

Иногда вам не повезет


Касательно слов всегда, иногда и прочих

Когда размышляете над многопоточным кодом, (в уме) доказывайте 2 вещи:

  • Никогда не случится ничего плохого
  • Когда-нибудь случится что-нибудь хорошее
In [25]:
%%writefile example4.c

// with mutex

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

pthread_mutex_t mutex;
int global_int;

void *worker(void *arg) {
    int *gi = (int*)arg;
    
    for (int i = 0; i < 1000; ++i) {
        pthread_mutex_lock(&mutex);
        
        int old = *gi;
        int new = old + 1;
        *gi = new;
        
        pthread_mutex_unlock(&mutex);
    }
}

int main() {
    global_int = 0;
    pthread_t threads[2];
    
    pthread_create(threads + 0, NULL, worker, (void*)(&global_int));
    pthread_create(threads + 1, NULL, worker, (void*)(&global_int));
    
    pthread_join(threads[0], NULL);
    pthread_join(threads[1], NULL);
    
    printf("%d\n", global_int);
    
    return 0;
}
Writing example4.c
In [26]:
!gcc example4.c -o example4
!./example4
2000

Код стал работать ДОЛЬШЕ, но стал гарантировать ответ 2000

Condvar (conditional variable)¶

Разгоняем телегу с глупыми и нет аналогиями))

Допустим вы работали себе, никого не трогали.

Делигировали какую-нибудь сложную задачу другому потоку (факторизация числа or smth).

Продолжили работать.

Внезапно дошли до момента, когда вам понадобился результат.

Вопросы: Как его забрать?) Как понять что результат готов? Как понять, что это новый результат, а не старый?

Вопрос со звездочкой: А что если вместе с вами кто-то ещё ждет результат этой же работы?

Хорошее решение - лечь спать и попросить разбудить)

Вот для решения такой проблемы служит condvar


Ещё что важно знать - condvar-у для работы нужен залоченый mutex

+-Очевидно, если помнить, что:

  1. Вы делите данные с другими потоками
  2. Кроме вас готовности этих данных могут ждать ещё несколько потоков

Пример:

In [35]:
%%writefile example5.c

#include <stdio.h>
#include <stdlib.h>

#include <pthread.h>
#include <unistd.h>

typedef struct {
  pthread_mutex_t *mutex;
  pthread_cond_t *condvar;

  int64_t *next_out;
} gen_args_t;


void *do_work(void *args) {
  gen_args_t *params = (gen_args_t*)args;

  for (int64_t i=0; ; ++i) {
    pthread_mutex_lock(params->mutex);
    *params->next_out = i;

    pthread_cond_signal(params->condvar);

    while (*params->next_out != -1)
        pthread_cond_wait(params->condvar, params->mutex);

    pthread_mutex_unlock(params->mutex);
  }
}

int main(int argc, char **argv) {
  pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  pthread_cond_t condvar = PTHREAD_COND_INITIALIZER;
  int64_t next_out;
  gen_args_t thread_args = {
      .mutex = &mutex,
      .condvar = &condvar,
      .next_out = &next_out,
  };

  pthread_t worker;
  pthread_create(&worker, NULL, do_work, &thread_args);

  next_out = -1;
  const int N = 10;
  for (int32_t count = 0; count < N; ++count) {
    pthread_mutex_lock(&mutex);
      
    while (next_out == -1)
      pthread_cond_wait(&condvar, &mutex);

    int64_t cur_out = next_out;

    next_out = -1;
    pthread_mutex_unlock(&mutex);
  
    if (count != N - 1)
      pthread_cond_signal(&condvar);

    printf("%lld\n", cur_out);
  }

  pthread_cancel(worker);

  pthread_cond_destroy(&condvar);
  pthread_mutex_destroy(&mutex);
  return 0;
}
Overwriting example5.c
In [36]:
!gcc example5.c -o example5
!./example5
0
1
2
3
4
5
6
7
8
9

Lock-free¶

Сейчас давайте вернемся к понятию атомарных операций

Операция атомарна, если она не может быть прервана

Нужные для решения 3-ки функции (на самом деле функция) описаны тут

Вроде рабочий код с примером использования Atomic-ов есть тут


Думаю полезнее будет обсудить, как написать mutex на псевдокоде

Основной инструмент - CAS

Compare And Save operation

CAS(atomic_ptr, atomic_expected, value_if_true, value_if_false) -> returns true on swap